package gu.simplemq.stomp;

import java.util.Iterator;
import java.util.Set;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentMap;

import org.projectodd.stilts.stomp.StompException;
import org.projectodd.stilts.stomp.StompMessage;
import org.projectodd.stilts.stomp.Subscription.AckMode;
import org.projectodd.stilts.stomp.client.ClientSubscription;
import org.projectodd.stilts.stomp.client.MessageHandler;
import org.projectodd.stilts.stomp.client.StompClient;
import org.projectodd.stilts.stomp.client.SubscriptionBuilder;

import com.alibaba.fastjson.JSONObject;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;

import gu.simplemq.IAdvisor;
import gu.simplemq.json.BaseJsonEncoder;
import static com.google.common.base.Preconditions.*;
/**
 * 通过订阅'/topic/ActiveMQ.Advisory.Consumer.Topic','/topic/ActiveMQ.Advisory.Consumer.Queue'
 * 统计每个频道/队列订阅/消费者数量(线程安全)
 * @author guyadong
 * @deprecated
 */
public class AdvisoryMessageManager implements AutoCloseable,IAdvisor,StompConstants,MessageHandler{
	private static final String FIELD_ConsumerInfo = "ConsumerInfo";
	private static final String FIELD_RemoveInfo = "RemoveInfo";
	private static final String TOPIC_PREIFX = "/topic/ActiveMQ.Advisory.Consumer.Topic.";
	private static final String QUEUE_PREIFX = "/topic/ActiveMQ.Advisory.Consumer.Queue.";

	private static final String ADVISORY_CONSUMER_TOPIC_PATTERN = TOPIC_PREIFX + "*";
	private static final String ADVISORY_CONSUMER_QUEUE_PATTERN = QUEUE_PREIFX + "*";
	private final ConcurrentMap<String, Set<String>> advisoryConsumers = Maps.newConcurrentMap();
	private static final LoadingCache<StompPoolLazy,AdvisoryMessageManager> CACHE = CacheBuilder.newBuilder().build(new CacheLoader<StompPoolLazy, AdvisoryMessageManager>(){

		@Override
		public AdvisoryMessageManager load(StompPoolLazy key) throws Exception {
			return new AdvisoryMessageManager(key);
		}});
	private final StompClient stompClient;
	private final StompPoolLazy pool;
	private final ClientSubscription topicSub;
	private final ClientSubscription queueSub;
	private AdvisoryMessageManager(StompPoolLazy pool) {
		this.pool = checkNotNull(pool, "pool is null");
		stompClient = this.pool.borrow();
		topicSub = sub(ADVISORY_CONSUMER_TOPIC_PATTERN);
		queueSub = sub(ADVISORY_CONSUMER_QUEUE_PATTERN);
	}
	ClientSubscription sub (String dest){
		SubscriptionBuilder builder = stompClient.subscribe( dest )
		        .withMessageHandler( this )
		        .withAckMode( AckMode.CLIENT_INDIVIDUAL );
		for(Entry<String, String> entry:((StompPoolLazy)pool).getHeaders().entrySet()){
			builder.withHeader(entry.getKey(), entry.getValue());
		}
		try {
			return builder.start();
		} catch (StompException e) {
			throw new RuntimeException(e);
		}
	}

	@Override
	public void close() {
		synchronized (this) {
			try {
				if(stompClient.isConnected()){
					if(topicSub.isActive()){
						topicSub.unsubscribe();
					}
					if(queueSub.isActive()){
						queueSub.unsubscribe();
					}
					this.pool.release(stompClient);
				}
				CACHE.asMap().remove(pool);
			} catch (StompException e) {
				e.printStackTrace();
			}

		}
	}
	
	@Override
	public int consumerCountOf(String channelName) {
		Set<String> message = advisoryConsumers.get(TOPIC_PREIFX + "Queue." + channelName);
		return message == null ? 0 : message.size();
	}
	@Override
	public int subscriberCountOf(String channelName) {
		Set<String> message = advisoryConsumers.get(TOPIC_PREIFX + channelName);
		return message == null ? 0 : message.size();
	}
	public static AdvisoryMessageManager instanceOf(StompPoolLazy pool){
		return CACHE.getUnchecked(pool);
	}
	/**
	 * 关闭并删除所有资源池中的{@link AdvisoryMessageManager}实例
	 */
	public synchronized static void closeAll(){
		for(Iterator<AdvisoryMessageManager> itor = CACHE.asMap().values().iterator();itor.hasNext();){
			AdvisoryMessageManager p = itor.next();
			itor.remove();
			p.close();
		}
	}
	@Override
	public void handle(StompMessage message) {

		JSONObject json = BaseJsonEncoder.getEncoder().fromJson(message.getContentAsString(),JSONObject.class);
		if(json.containsKey(FIELD_ConsumerInfo)){
			JSONObject consumInfo = json.getObject(FIELD_ConsumerInfo,JSONObject.class);
			String connectionId = consumInfo.getObject("consumerId", JSONObject.class).getString("connectionId");
			advisoryConsumers.putIfAbsent(message.getDestination(), Sets.<String>newHashSet());
			advisoryConsumers.get(message.getDestination()).add(connectionId);
		}else if(json.containsKey(FIELD_RemoveInfo)){
			JSONObject removeInfo = json.getObject(FIELD_RemoveInfo,JSONObject.class);
			String connectionId = removeInfo.getObject("objectId", JSONObject.class).getString("connectionId");
			advisoryConsumers.putIfAbsent(message.getDestination(), Sets.<String>newHashSet());
			advisoryConsumers.get(message.getDestination()).remove(connectionId);
		}
		synchronized (this) {
			this.notifyAll();
		}
	}
}
